1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.*;
19 import static org.mockito.Mockito.*;
20
21 import java.util.Arrays;
22 import java.util.concurrent.TimeUnit;
23
24 import org.junit.Test;
25
26 import rx.*;
27 import rx.functions.Func1;
28
29 public class OperatorAllTest {
30
31 @Test
32 @SuppressWarnings("unchecked")
33 public void testAll() {
34 Observable<String> obs = Observable.just("one", "two", "six");
35
36 Observer<Boolean> observer = mock(Observer.class);
37 obs.all(new Func1<String, Boolean>() {
38 @Override
39 public Boolean call(String s) {
40 return s.length() == 3;
41 }
42 }).subscribe(observer);
43
44 verify(observer).onNext(true);
45 verify(observer).onCompleted();
46 verifyNoMoreInteractions(observer);
47 }
48
49 @Test
50 @SuppressWarnings("unchecked")
51 public void testNotAll() {
52 Observable<String> obs = Observable.just("one", "two", "three", "six");
53
54 Observer<Boolean> observer = mock(Observer.class);
55 obs.all(new Func1<String, Boolean>() {
56 @Override
57 public Boolean call(String s) {
58 return s.length() == 3;
59 }
60 }).subscribe(observer);
61
62 verify(observer).onNext(false);
63 verify(observer).onCompleted();
64 verifyNoMoreInteractions(observer);
65 }
66
67 @Test
68 @SuppressWarnings("unchecked")
69 public void testEmpty() {
70 Observable<String> obs = Observable.empty();
71
72 Observer<Boolean> observer = mock(Observer.class);
73 obs.all(new Func1<String, Boolean>() {
74 @Override
75 public Boolean call(String s) {
76 return s.length() == 3;
77 }
78 }).subscribe(observer);
79
80 verify(observer).onNext(true);
81 verify(observer).onCompleted();
82 verifyNoMoreInteractions(observer);
83 }
84
85 @Test
86 @SuppressWarnings("unchecked")
87 public void testError() {
88 Throwable error = new Throwable();
89 Observable<String> obs = Observable.error(error);
90
91 Observer<Boolean> observer = mock(Observer.class);
92 obs.all(new Func1<String, Boolean>() {
93 @Override
94 public Boolean call(String s) {
95 return s.length() == 3;
96 }
97 }).subscribe(observer);
98
99 verify(observer).onError(error);
100 verifyNoMoreInteractions(observer);
101 }
102
103 @Test
104 public void testFollowingFirst() {
105 Observable<Integer> o = Observable.from(Arrays.asList(1, 3, 5, 6));
106 Observable<Boolean> allOdd = o.all(new Func1<Integer, Boolean>() {
107 @Override
108 public Boolean call(Integer i) {
109 return i % 2 == 1;
110 }
111 });
112 assertFalse(allOdd.toBlocking().first());
113 }
114 @Test(timeout = 5000)
115 public void testIssue1935NoUnsubscribeDownstream() {
116 Observable<Integer> source = Observable.just(1)
117 .all(new Func1<Object, Boolean>() {
118 @Override
119 public Boolean call(Object t1) {
120 return false;
121 }
122 })
123 .flatMap(new Func1<Boolean, Observable<Integer>>() {
124 @Override
125 public Observable<Integer> call(Boolean t1) {
126 return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
127 }
128 });
129 assertEquals((Object)2, source.toBlocking().first());
130 }
131 }